package com.flink.examples.mysql;


import com.flink.examples.TCount;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

/**
 * @Description 将Flink的流中数据写入到MySQL数据源中
 */
public class JdbcWriter extends RichSinkFunction<TCount>{
    private Connection connection;
    private PreparedStatement preparedStatement;

    /**
     * job开始执行，调用此方法创建jdbc数据源连接对象
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        // 加载JDBC驱动
        Class.forName(MysqlConfig.DRIVER_CLASS);
        // 获取数据库连接
        connection = DriverManager.getConnection(MysqlConfig.SOURCE_DRIVER_URL,MysqlConfig.SOURCE_USER,MysqlConfig.SOURCE_PASSWORD);
        preparedStatement = connection.prepareStatement("insert into t_count(sex, num) values (?,?)");
    }

    /**
     * job执行完毕后，调用此方法关闭jdbc连接源
     * @throws Exception
     */
    @Override
    public void close() throws Exception {
        super.close();
        if(preparedStatement != null){
            preparedStatement.close();
        }
        if(connection != null){
            connection.close();
        }
    }

    /**
     * 此方法实现接口中的invoke，在DataStream数据流中的每一条记录均会调用本方法执行数据同步
     * @param count
     * @param context
     * @throws Exception
     */
    @Override
    public void invoke(TCount count, Context context) throws Exception {
        try {
            //获取JdbcReader发送过来的结果
            preparedStatement.setInt(1, count.getSex());
            preparedStatement.setInt(2, count.getNum());
            preparedStatement.executeUpdate();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
